|
Vaibhav Puranik
2012-07-12, 00:05
Jun Rao
2012-07-12, 04:06
Vaibhav Puranik
2012-07-12, 05:03
Jun Rao
2012-07-12, 14:17
Vaibhav Puranik
2012-07-12, 16:34
Neha Narkhede
2012-07-12, 17:26
Vaibhav Puranik
2012-07-12, 18:24
Jun Rao
2012-07-12, 18:33
Vaibhav Puranik
2012-07-12, 18:40
Felix GV
2012-09-15, 01:52
Jun Rao
2012-09-15, 05:39
|
-
Getting fixed amount of messages using Zookeeper based consumerVaibhav Puranik 2012-07-12, 00:05
Hi all,
Is there any way to get a fixed amount of messages using Zookeeper based consumer (ConsumerConnector)? I know that with SimpleConsumer you can pass fetchSize as an argument and limit the number of messages coming back. This sample code creates 4 threads that keep consuming forever. // consume the messages in the threads for(final KafkaStream<Message> stream: streams) { executor.submit(new Runnable() { public void run() { for(MessageAndMetadata msgAndMetadata: stream) { // process message (msgAndMetadata.message()) } } }); } Regards, Vaibhav
-
Re: Getting fixed amount of messages using Zookeeper based consumerJun Rao 2012-07-12, 04:06
Well, you can control how many messages to iterate in the application.
Thanks, Jun On Wed, Jul 11, 2012 at 5:05 PM, Vaibhav Puranik <[EMAIL PROTECTED]> wrote: > Hi all, > > Is there any way to get a fixed amount of messages using Zookeeper based > consumer (ConsumerConnector)? > > I know that with SimpleConsumer you can pass fetchSize as an argument and > limit the number of messages coming back. > > This sample code creates 4 threads that keep consuming forever. > > // consume the messages in the threads > for(final KafkaStream<Message> stream: streams) { > executor.submit(new Runnable() { > public void run() { > for(MessageAndMetadata msgAndMetadata: stream) { > // process message (msgAndMetadata.message()) > } > } > }); > } > > Regards, > Vaibhav >
-
Re: Getting fixed amount of messages using Zookeeper based consumerVaibhav Puranik 2012-07-12, 05:03
The inner loop keeps running. If I break it in the middle, is Kafka broker
going to know that rest of the mesaages in the stream were not delivered? Regards, Vaibhav GumGum On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <[EMAIL PROTECTED]> wrote: > Hi all, > > Is there any way to get a fixed amount of messages using Zookeeper based > consumer (ConsumerConnector)? > > I know that with SimpleConsumer you can pass fetchSize as an argument and > limit the number of messages coming back. > > This sample code creates 4 threads that keep consuming forever. > > > // consume the messages in the threads > for(final KafkaStream<Message> stream: streams) { > executor.submit(new Runnable() { > public void run() { > for(MessageAndMetadata msgAndMetadata: stream) { > // process message (msgAndMetadata.message()) > } > } > }); > } > > Regards, > Vaibhav > > >
-
Re: Getting fixed amount of messages using Zookeeper based consumerJun Rao 2012-07-12, 14:17
Yes, it knows. The consumer offset is only advanced every time a message is
iterated over. Thanks, Jun On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <[EMAIL PROTECTED]>wrote: > The inner loop keeps running. If I break it in the middle, is Kafka broker > going to know that rest of the mesaages in the stream were not delivered? > > Regards, > Vaibhav > GumGum > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <[EMAIL PROTECTED]> wrote: > > > Hi all, > > > > Is there any way to get a fixed amount of messages using Zookeeper based > > consumer (ConsumerConnector)? > > > > I know that with SimpleConsumer you can pass fetchSize as an argument and > > limit the number of messages coming back. > > > > This sample code creates 4 threads that keep consuming forever. > > > > > > // consume the messages in the threads > > for(final KafkaStream<Message> stream: streams) { > > executor.submit(new Runnable() { > > public void run() { > > for(MessageAndMetadata msgAndMetadata: stream) { > > // process message (msgAndMetadata.message()) > > } > > } > > }); > > } > > > > Regards, > > Vaibhav > > > > > > >
-
Re: Getting fixed amount of messages using Zookeeper based consumerVaibhav Puranik 2012-07-12, 16:34
I tried breaking the loop in the middle and here is what I got:
10158 [FetchRunnable-0] INFO kafka.consumer.SimpleConsumer - Reconnect in multifetch due to socket error: java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270) at kafka.utils.Utils$.read(Utils.scala:538) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) at kafka.network.Receive$class.readCompletely(Transmission.scala:55) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177) at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117) at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60) 10163 [FetchRunnable-0] INFO kafka.consumer.FetcherRunnable - FecherRunnable Thread[FetchRunnable-0,5,main] interrupted I get this exception one per broker. I get three exceptions because we have three brokers. I am calling KafakStream.clear() just before breaking the loop. Is there any way to break the stream cleanly? Or am I just suppose to catch this exception? (My fetch size if 1 MB and batch size is small for now - 5. But I don't think we can ever match batch size with fetch size accurately because each message size is not fixed). Regards, Vaibhav On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > Yes, it knows. The consumer offset is only advanced every time a message is > iterated over. > > Thanks, > > Jun > > On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <[EMAIL PROTECTED] > >wrote: > > > The inner loop keeps running. If I break it in the middle, is Kafka > broker > > going to know that rest of the mesaages in the stream were not delivered? > > > > Regards, > > Vaibhav > > GumGum > > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <[EMAIL PROTECTED]> wrote: > > > > > Hi all, > > > > > > Is there any way to get a fixed amount of messages using Zookeeper > based > > > consumer (ConsumerConnector)? > > > > > > I know that with SimpleConsumer you can pass fetchSize as an argument > and > > > limit the number of messages coming back. > > > > > > This sample code creates 4 threads that keep consuming forever. > > > > > > > > > // consume the messages in the threads > > > for(final KafkaStream<Message> stream: streams) { > > > executor.submit(new Runnable() { > > > public void run() { > > > for(MessageAndMetadata msgAndMetadata: stream) { > > > // process message (msgAndMetadata.message()) > > > } > > > } > > > }); > > > } > > > > > > Regards, > > > Vaibhav > > > > > > > > > > > >
-
Re: Getting fixed amount of messages using Zookeeper based consumerNeha Narkhede 2012-07-12, 17:26
Vaibhav,
The cleaner way of exiting the consumer iterator loop is by calling the shutdown() API on the ZookeeperConsumerConnector. But probably there is a reason you are not able to use that approach ? Thanks, Neha On Thu, Jul 12, 2012 at 9:34 AM, Vaibhav Puranik <[EMAIL PROTECTED]> wrote: > I tried breaking the loop in the middle and here is what I got: > > 10158 [FetchRunnable-0] INFO kafka.consumer.SimpleConsumer - Reconnect in > multifetch due to socket error: > java.nio.channels.ClosedByInterruptException > at > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270) > at kafka.utils.Utils$.read(Utils.scala:538) > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) > at kafka.network.Receive$class.readCompletely(Transmission.scala:55) > at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177) > at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117) > at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115) > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60) > 10163 [FetchRunnable-0] INFO kafka.consumer.FetcherRunnable - > FecherRunnable Thread[FetchRunnable-0,5,main] interrupted > > I get this exception one per broker. I get three exceptions because we have > three brokers. > > I am calling KafakStream.clear() just before breaking the loop. > > Is there any way to break the stream cleanly? Or am I just suppose to catch > this exception? > (My fetch size if 1 MB and batch size is small for now - 5. But I don't > think we can ever match batch size with fetch size accurately because each > message size is not fixed). > > Regards, > Vaibhav > > > On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > >> Yes, it knows. The consumer offset is only advanced every time a message is >> iterated over. >> >> Thanks, >> >> Jun >> >> On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <[EMAIL PROTECTED] >> >wrote: >> >> > The inner loop keeps running. If I break it in the middle, is Kafka >> broker >> > going to know that rest of the mesaages in the stream were not delivered? >> > >> > Regards, >> > Vaibhav >> > GumGum >> > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <[EMAIL PROTECTED]> wrote: >> > >> > > Hi all, >> > > >> > > Is there any way to get a fixed amount of messages using Zookeeper >> based >> > > consumer (ConsumerConnector)? >> > > >> > > I know that with SimpleConsumer you can pass fetchSize as an argument >> and >> > > limit the number of messages coming back. >> > > >> > > This sample code creates 4 threads that keep consuming forever. >> > > >> > > >> > > // consume the messages in the threads >> > > for(final KafkaStream<Message> stream: streams) { >> > > executor.submit(new Runnable() { >> > > public void run() { >> > > for(MessageAndMetadata msgAndMetadata: stream) { >> > > // process message (msgAndMetadata.message()) >> > > } >> > > } >> > > }); >> > > } >> > > >> > > Regards, >> > > Vaibhav >> > > >> > > >> > > >> > >>
-
Re: Getting fixed amount of messages using Zookeeper based consumerVaibhav Puranik 2012-07-12, 18:24
Neha,
I tried calling shutdown on ConsumerConnector. The only problem I have with it is that it forces me to close the connection thereby forcing me to open the connection every single time I want to fetch new set of messages. Here is my sample code - http://pastebin.com/6NBHtteL But it's not a big deal. I am writing a Zookeeper based KafakSpout. It's possible for me to open the connection everytime. I don't think it's going to be huge performance problem at our scale. I am still getting those exceptions inspite of calling ConsumerConnector.shutdown. *But I noticed that it's caught and handled by Kafka code. It's being logged by Kafka with INFO log level. *It lets me continue inspite of the exception. Ideally I would have liked to have a cleaner INFO log. This puts a stack trace in the Info log which if possible should be avoided. If you want I can file a jira issue to get rid of it. Regards, Vaibhav On Thu, Jul 12, 2012 at 10:26 AM, Neha Narkhede <[EMAIL PROTECTED]>wrote: > Vaibhav, > > The cleaner way of exiting the consumer iterator loop is by calling > the shutdown() API on the ZookeeperConsumerConnector. But probably > there is a reason you are not able to use that approach ? > > Thanks, > Neha > > On Thu, Jul 12, 2012 at 9:34 AM, Vaibhav Puranik <[EMAIL PROTECTED]> > wrote: > > I tried breaking the loop in the middle and here is what I got: > > > > 10158 [FetchRunnable-0] INFO kafka.consumer.SimpleConsumer - Reconnect > in > > multifetch due to socket error: > > java.nio.channels.ClosedByInterruptException > > at > > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270) > > at kafka.utils.Utils$.read(Utils.scala:538) > > at > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:55) > > at > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at > kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177) > > at > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117) > > at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115) > > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60) > > 10163 [FetchRunnable-0] INFO kafka.consumer.FetcherRunnable - > > FecherRunnable Thread[FetchRunnable-0,5,main] interrupted > > > > I get this exception one per broker. I get three exceptions because we > have > > three brokers. > > > > I am calling KafakStream.clear() just before breaking the loop. > > > > Is there any way to break the stream cleanly? Or am I just suppose to > catch > > this exception? > > (My fetch size if 1 MB and batch size is small for now - 5. But I don't > > think we can ever match batch size with fetch size accurately because > each > > message size is not fixed). > > > > Regards, > > Vaibhav > > > > > > On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > >> Yes, it knows. The consumer offset is only advanced every time a > message is > >> iterated over. > >> > >> Thanks, > >> > >> Jun > >> > >> On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <[EMAIL PROTECTED] > >> >wrote: > >> > >> > The inner loop keeps running. If I break it in the middle, is Kafka > >> broker > >> > going to know that rest of the mesaages in the stream were not > delivered? > >> > > >> > Regards, > >> > Vaibhav > >> > GumGum > >> > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <[EMAIL PROTECTED]> > wrote: > >> > > >> > > Hi all, > >> > > > >> > > Is there any way to get a fixed amount of messages using Zookeeper > >> based > >> > > consumer (ConsumerConnector)? > >> > > > >> > > I know that with SimpleConsumer you can pass fetchSize as an > argument > >> and > >> > > limit the number of messages coming back. > >> > > > >> > > This sample code creates 4 threads that keep consuming forever.
-
Re: Getting fixed amount of messages using Zookeeper based consumerJun Rao 2012-07-12, 18:33
If you don't want to shutdown the connector each time, you can also set
consumer.timeout.ms. This way, the iterator hasNext will get an exception if no new messages are available after the timeout. The iterator is re-enterable. Thanks, Jun On Thu, Jul 12, 2012 at 11:24 AM, Vaibhav Puranik <[EMAIL PROTECTED]>wrote: > Neha, > > I tried calling shutdown on ConsumerConnector. The only problem I have with > it is that it forces me to close the connection thereby forcing me to open > the connection every single time I want to fetch new set of messages. > Here is my sample code - http://pastebin.com/6NBHtteL > > But it's not a big deal. I am writing a Zookeeper based KafakSpout. It's > possible for me to open the connection everytime. I don't think it's going > to be huge performance problem at our scale. > > I am still getting those exceptions inspite of calling > ConsumerConnector.shutdown. *But I noticed that it's caught and handled by > Kafka code. It's being logged by Kafka with INFO log level. *It lets me > continue inspite of the exception. > > Ideally I would have liked to have a cleaner INFO log. This puts a stack > trace in the Info log which if possible should be avoided. If you want I > can file a jira issue to get rid of it. > > Regards, > Vaibhav > > > > > On Thu, Jul 12, 2012 at 10:26 AM, Neha Narkhede <[EMAIL PROTECTED] > >wrote: > > > Vaibhav, > > > > The cleaner way of exiting the consumer iterator loop is by calling > > the shutdown() API on the ZookeeperConsumerConnector. But probably > > there is a reason you are not able to use that approach ? > > > > Thanks, > > Neha > > > > On Thu, Jul 12, 2012 at 9:34 AM, Vaibhav Puranik <[EMAIL PROTECTED]> > > wrote: > > > I tried breaking the loop in the middle and here is what I got: > > > > > > 10158 [FetchRunnable-0] INFO kafka.consumer.SimpleConsumer - > Reconnect > > in > > > multifetch due to socket error: > > > java.nio.channels.ClosedByInterruptException > > > at > > > > > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) > > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270) > > > at kafka.utils.Utils$.read(Utils.scala:538) > > > at > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) > > > at > kafka.network.Receive$class.readCompletely(Transmission.scala:55) > > > at > > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > at > > kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177) > > > at > > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117) > > > at > kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115) > > > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60) > > > 10163 [FetchRunnable-0] INFO kafka.consumer.FetcherRunnable - > > > FecherRunnable Thread[FetchRunnable-0,5,main] interrupted > > > > > > I get this exception one per broker. I get three exceptions because we > > have > > > three brokers. > > > > > > I am calling KafakStream.clear() just before breaking the loop. > > > > > > Is there any way to break the stream cleanly? Or am I just suppose to > > catch > > > this exception? > > > (My fetch size if 1 MB and batch size is small for now - 5. But I > don't > > > think we can ever match batch size with fetch size accurately because > > each > > > message size is not fixed). > > > > > > Regards, > > > Vaibhav > > > > > > > > > On Thu, Jul 12, 2012 at 7:17 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > > > >> Yes, it knows. The consumer offset is only advanced every time a > > message is > > >> iterated over. > > >> > > >> Thanks, > > >> > > >> Jun > > >> > > >> On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <[EMAIL PROTECTED] > > >> >wrote: > > >> > > >> > The inner loop keeps running. If I break it in the middle, is Kafka > > >> broker > > >> > going to know that rest of the mesaages in the stream were not
-
Re: Getting fixed amount of messages using Zookeeper based consumerVaibhav Puranik 2012-07-12, 18:40
Jun,
The messages are always available. My requirement is to get them in batches (without worrying about offsets) so that I can do batch aggregation. I want to mark each batch with a batch id so that my bolts can commit the aggregated results to the database. Thanks for your quick reply, Vaibhav On Thu, Jul 12, 2012 at 11:33 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > If you don't want to shutdown the connector each time, you can also set > consumer.timeout.ms. This way, the iterator hasNext will get an exception > if no new messages are available after the timeout. The iterator is > re-enterable. > > Thanks, > > Jun > > On Thu, Jul 12, 2012 at 11:24 AM, Vaibhav Puranik <[EMAIL PROTECTED] > >wrote: > > > Neha, > > > > I tried calling shutdown on ConsumerConnector. The only problem I have > with > > it is that it forces me to close the connection thereby forcing me to > open > > the connection every single time I want to fetch new set of messages. > > Here is my sample code - http://pastebin.com/6NBHtteL > > > > But it's not a big deal. I am writing a Zookeeper based KafakSpout. It's > > possible for me to open the connection everytime. I don't think it's > going > > to be huge performance problem at our scale. > > > > I am still getting those exceptions inspite of calling > > ConsumerConnector.shutdown. *But I noticed that it's caught and handled > by > > Kafka code. It's being logged by Kafka with INFO log level. *It lets me > > continue inspite of the exception. > > > > Ideally I would have liked to have a cleaner INFO log. This puts a stack > > trace in the Info log which if possible should be avoided. If you want I > > can file a jira issue to get rid of it. > > > > Regards, > > Vaibhav > > > > > > > > > > On Thu, Jul 12, 2012 at 10:26 AM, Neha Narkhede <[EMAIL PROTECTED] > > >wrote: > > > > > Vaibhav, > > > > > > The cleaner way of exiting the consumer iterator loop is by calling > > > the shutdown() API on the ZookeeperConsumerConnector. But probably > > > there is a reason you are not able to use that approach ? > > > > > > Thanks, > > > Neha > > > > > > On Thu, Jul 12, 2012 at 9:34 AM, Vaibhav Puranik <[EMAIL PROTECTED]> > > > wrote: > > > > I tried breaking the loop in the middle and here is what I got: > > > > > > > > 10158 [FetchRunnable-0] INFO kafka.consumer.SimpleConsumer - > > Reconnect > > > in > > > > multifetch due to socket error: > > > > java.nio.channels.ClosedByInterruptException > > > > at > > > > > > > > > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) > > > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:270) > > > > at kafka.utils.Utils$.read(Utils.scala:538) > > > > at > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) > > > > at > > kafka.network.Receive$class.readCompletely(Transmission.scala:55) > > > > at > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > > at > > > kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177) > > > > at > > > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117) > > > > at > > kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115) > > > > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60) > > > > 10163 [FetchRunnable-0] INFO kafka.consumer.FetcherRunnable - > > > > FecherRunnable Thread[FetchRunnable-0,5,main] interrupted > > > > > > > > I get this exception one per broker. I get three exceptions because > we > > > have > > > > three brokers. > > > > > > > > I am calling KafakStream.clear() just before breaking the loop. > > > > > > > > Is there any way to break the stream cleanly? Or am I just suppose to > > > catch > > > > this exception? > > > > (My fetch size if 1 MB and batch size is small for now - 5. But I > > don't > > > > think we can ever match batch size with fetch size accurately because
-
Re: Getting fixed amount of messages using Zookeeper based consumerFelix GV 2012-09-15, 01:52
Hello,
Sorry for doing thread necromancy on this one, but I have a little question hehe... Can you confirm whether my understanding, below, is correct please? 1. Every time I extract a message from a KafkaMessageStream, it sets my consumer offset to the offset of the beginning of the message I just extracted. 2. If I shut down my consumer (and call commitOffsets), then the offset kept in memory by the KafkaMessageStream will be committed to ZK. 3. This means that if my program persists the message I just extracted into a datastore, and then shuts down the consumer before I extract the next message, then, the next time I start my consumer, I will begin from the offset of the beginning of that same message, extract it again, and I will thus end up persisting a duplicate of that message in my datastore. This is what my testing seems to demonstrate... I have found a way to account for this behavior in my current use case, but I wanted to know if the behavior I describe above is normal and intended, or if perhaps I'm doing something weird that could be causing unexpected behavior. The behavior I describe makes sense for "at least once" guarantees of delivery. I guess the alternative implementation would have been to set the consumer offset in ZK to that of the next message, whether the current iteration succeeds or not, which would have given "at most once" guarantees (as far as that part of the system is concerned anyway). So, yeah, I'd like it if someone could confirm or deny my above interpretation. And also, I have another related question: say that a consumer was to die without being shutdown gracefully (and without calling commitOffsets), then would the offset stored in ZK be the one that was put there the last time the autocommit.interval.ms elapsed, thus causing potentially even more duplicate messages the next time the consumer is started? (This is assuming the default settings where autocommit.enable is true.) Thanks in advance :) ...! -- Felix On Thu, Jul 12, 2012 at 10:17 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > Yes, it knows. The consumer offset is only advanced every time a message is > iterated over. > > Thanks, > > Jun > > On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <[EMAIL PROTECTED] > >wrote: > > > The inner loop keeps running. If I break it in the middle, is Kafka > broker > > going to know that rest of the mesaages in the stream were not delivered? > > > > Regards, > > Vaibhav > > GumGum > > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <[EMAIL PROTECTED]> wrote: > > > > > Hi all, > > > > > > Is there any way to get a fixed amount of messages using Zookeeper > based > > > consumer (ConsumerConnector)? > > > > > > I know that with SimpleConsumer you can pass fetchSize as an argument > and > > > limit the number of messages coming back. > > > > > > This sample code creates 4 threads that keep consuming forever. > > > > > > > > > // consume the messages in the threads > > > for(final KafkaStream<Message> stream: streams) { > > > executor.submit(new Runnable() { > > > public void run() { > > > for(MessageAndMetadata msgAndMetadata: stream) { > > > // process message (msgAndMetadata.message()) > > > } > > > } > > > }); > > > } > > > > > > Regards, > > > Vaibhav > > > > > > > > > > > >
-
Re: Getting fixed amount of messages using Zookeeper based consumerJun Rao 2012-09-15, 05:39
Actually, every time you consume a message, the offset moves to the
beginning of the next message. Thanks, Jun On Fri, Sep 14, 2012 at 6:52 PM, Felix GV <[EMAIL PROTECTED]> wrote: > Hello, > > Sorry for doing thread necromancy on this one, but I have a little question > hehe... Can you confirm whether my understanding, below, is correct please? > > 1. Every time I extract a message from a KafkaMessageStream, it sets my > consumer offset to the offset of the beginning of the message I just > extracted. > 2. If I shut down my consumer (and call commitOffsets), then the offset > kept in memory by the KafkaMessageStream will be committed to ZK. > 3. This means that if my program persists the message I just extracted > into a datastore, and then shuts down the consumer before I extract the > next message, then, the next time I start my consumer, I will begin from > the offset of the beginning of that same message, extract it again, and > I > will thus end up persisting a duplicate of that message in my datastore. > > This is what my testing seems to demonstrate... > > I have found a way to account for this behavior in my current use case, but > I wanted to know if the behavior I describe above is normal and intended, > or if perhaps I'm doing something weird that could be causing unexpected > behavior. > > The behavior I describe makes sense for "at least once" guarantees of > delivery. I guess the alternative implementation would have been to set the > consumer offset in ZK to that of the next message, whether the current > iteration succeeds or not, which would have given "at most once" guarantees > (as far as that part of the system is concerned anyway). > > So, yeah, I'd like it if someone could confirm or deny my above > interpretation. > > And also, I have another related question: say that a consumer was to die > without being shutdown gracefully (and without calling commitOffsets), then > would the offset stored in ZK be the one that was put there the last time > the autocommit.interval.ms elapsed, thus causing potentially even more > duplicate messages the next time the consumer is started? (This is assuming > the default settings where autocommit.enable is true.) > > Thanks in advance :) ...! > > -- > Felix > > > > On Thu, Jul 12, 2012 at 10:17 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > Yes, it knows. The consumer offset is only advanced every time a message > is > > iterated over. > > > > Thanks, > > > > Jun > > > > On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <[EMAIL PROTECTED] > > >wrote: > > > > > The inner loop keeps running. If I break it in the middle, is Kafka > > broker > > > going to know that rest of the mesaages in the stream were not > delivered? > > > > > > Regards, > > > Vaibhav > > > GumGum > > > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <[EMAIL PROTECTED]> wrote: > > > > > > > Hi all, > > > > > > > > Is there any way to get a fixed amount of messages using Zookeeper > > based > > > > consumer (ConsumerConnector)? > > > > > > > > I know that with SimpleConsumer you can pass fetchSize as an argument > > and > > > > limit the number of messages coming back. > > > > > > > > This sample code creates 4 threads that keep consuming forever. > > > > > > > > > > > > // consume the messages in the threads > > > > for(final KafkaStream<Message> stream: streams) { > > > > executor.submit(new Runnable() { > > > > public void run() { > > > > for(MessageAndMetadata msgAndMetadata: stream) { > > > > // process message (msgAndMetadata.message()) > > > > } > > > > } > > > > }); > > > > } > > > > > > > > Regards, > > > > Vaibhav > > > > > > > > > > > > > > > > > > |