Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Getting fixed amount of messages using Zookeeper based consumer


Copy link to this message
-
Re: Getting fixed amount of messages using Zookeeper based consumer
Hello,

Sorry for doing thread necromancy on this one, but I have a little question
hehe... Can you confirm whether my understanding, below, is correct please?

   1. Every time I extract a message from a KafkaMessageStream, it sets my
   consumer offset to the offset of the beginning of the message I just
   extracted.
   2. If I shut down my consumer (and call commitOffsets), then the offset
   kept in memory by the KafkaMessageStream will be committed to ZK.
   3. This means that if my program persists the message I just extracted
   into a datastore, and then shuts down the consumer before I extract the
   next message, then, the next time I start my consumer, I will begin from
   the offset of the beginning of that same message, extract it again, and I
   will thus end up persisting a duplicate of that message in my datastore.

This is what my testing seems to demonstrate...

I have found a way to account for this behavior in my current use case, but
I wanted to know if the behavior I describe above is normal and intended,
or if perhaps I'm doing something weird that could be causing unexpected
behavior.

The behavior I describe makes sense for "at least once" guarantees of
delivery. I guess the alternative implementation would have been to set the
consumer offset in ZK to that of the next message, whether the current
iteration succeeds or not, which would have given "at most once" guarantees
(as far as that part of the system is concerned anyway).

So, yeah, I'd like it if someone could confirm or deny my above
interpretation.

And also, I have another related question: say that a consumer was to die
without being shutdown gracefully (and without calling commitOffsets), then
would the offset stored in ZK be the one that was put there the last time
the autocommit.interval.ms elapsed, thus causing potentially even more
duplicate messages the next time the consumer is started? (This is assuming
the default settings where autocommit.enable is true.)

Thanks in advance :) ...!

--
Felix

On Thu, Jul 12, 2012 at 10:17 AM, Jun Rao <[EMAIL PROTECTED]> wrote:

> Yes, it knows. The consumer offset is only advanced every time a message is
> iterated over.
>
> Thanks,
>
> Jun
>
> On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <[EMAIL PROTECTED]
> >wrote:
>
> > The inner loop keeps running. If I break it in the middle, is Kafka
> broker
> > going to know that rest of the mesaages in the stream were not delivered?
> >
> > Regards,
> > Vaibhav
> > GumGum
> > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <[EMAIL PROTECTED]> wrote:
> >
> > > Hi all,
> > >
> > > Is there any way to get a fixed amount of messages using Zookeeper
> based
> > > consumer (ConsumerConnector)?
> > >
> > > I know that with SimpleConsumer you can pass fetchSize as an argument
> and
> > > limit the number of messages coming back.
> > >
> > > This sample code creates 4 threads that keep consuming forever.
> > >
> > >
> > > // consume the messages in the threads
> > > for(final KafkaStream<Message> stream: streams) {
> > >   executor.submit(new Runnable() {
> > >     public void run() {
> > >       for(MessageAndMetadata msgAndMetadata: stream) {
> > >         // process message (msgAndMetadata.message())
> > >       }
> > >     }
> > >   });
> > > }
> > >
> > > Regards,
> > > Vaibhav
> > >
> > >
> > >
> >
>
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB