Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> are kafka consumer apps guaranteed to see msgs at least once?


Copy link to this message
-
Re: are kafka consumer apps guaranteed to see msgs at least once?
why not just disable autocommit and only call commit offsets() after you've
processed a batch? it isn't obvious to me how doing so would allow a
message to be processed zero times.
On Nov 21, 2013 5:52 PM, "Imran Rashid" <[EMAIL PROTECTED]> wrote:

> Hi Edward,
>
> I think you misunderstand ... I definitely do *not* want to commit
> every message.  That would be much too expensive, every few minutes is
> plenty for me.
>
> I want to guarantee that if I commit a message, that my message has
> been processed by *my application* at least once.  (In another thread
> I mentioned something about reading exactly once, but that is not what
> I am going for here -- more than once is OK, as long as its at least
> once.)  That requires me to make sure that a commit never happens
> between a call to next() and doSomething().  Kafka guarantees that
> messages get read off the queue at least once, but as I outlined
> above, a naive solution allows for messages to get read off the queue,
> but not make it into my app.
>
> Not only do I not want to commit every message -- I'd really like to
> have the above guarantee without even acquiring a lock (most of the
> time).  That is what I was getting at with the comment about the
> spin-lock, just as an idea of how to prevent a commit between next()
> and doSomething().  I dunno if the spin-lock was really the right
> idea, that was just a random thought, but the point is, we want some
> check that should be very cheap for 99.999% of the time when a commit
> isn't happening, but can still guarantee proper ordering during that
> 0.001% of the time when a commit is happening.  otherwise, messages
> might never make it to my app.  (And this is all just to prevent a
> message getting lost during the 10^-6% of the time when a commit might
> happen between next() and doSomething(), and the app dies before
> doSomething completes!)
>
> Actually after a bit more thought -- the best way to guarantee this
> would be with a small api change, that would do away with the need for
> locks completely.  The iterator should have a method applyToNext:
>
> def applyToNext(f: MessageAndData[K,V] => Unit) {
>   //all the stuff in next, *except*
> currentTopicInfo.resetConsumeOffset(consumedOffset)
>   val item = ...
>   f(item)
>   //after we've applied the users function, now we can update the
> offset that should get committed
>   currentTopicInfo.resetConsumeOffset(consumedOffset)
> }
>
> I think this way, you could avoid any problems even with auto-commit.
> Or, if you don't want to add a new method, so we can stick to the
> Iterator api, then maybe the iterator could let you register a
> preCommitFunction, so next() would change to:
>
> override def next(): MessageAndMetadata[K, V] = {
>     ...
>     val item = ...
>     preCommitFuntions.foreach{f => f(item)}
>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
>     ...
>     item
>   }
>
>
> thanks,
> Imran
>
> On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo <[EMAIL PROTECTED]>
> wrote:
> > You likely need to use a custom offset solution if you plan on committing
> > every message. With many partitions this puts a large burden on
> Zookeeper,
> > you end up needing to roll over your zk transaction logs fast as well or
> > rist filling up disk
> >
> >
> > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <[EMAIL PROTECTED]>
> wrote:
> >
> >> Hello Imran,
> >>
> >> The offset will only be updated when the next() function is called:
> >>
> >> override def next(): MessageAndMetadata[K, V] = {
> >>     ...
> >>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
> >>     ...
> >>     item
> >>   }
> >>
> >> instead of in makeNext(), which will just update consumedOffset, but
> that
> >> is not the value that will be committed using the commitOffset call. So
> as
> >> long as you turn of auto commit and only call commitOffset after the
> >> process(msg) call, not after the
> >>
> >> b = iter.next()
> >>
> >> is called, at-least-once is guaranteed.

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB