Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> are kafka consumer apps guaranteed to see msgs at least once?


Copy link to this message
-
Re: are kafka consumer apps guaranteed to see msgs at least once?
why not just disable autocommit and only call commit offsets() after you've
processed a batch? it isn't obvious to me how doing so would allow a
message to be processed zero times.
On Nov 21, 2013 5:52 PM, "Imran Rashid" <[EMAIL PROTECTED]> wrote:

> Hi Edward,
>
> I think you misunderstand ... I definitely do *not* want to commit
> every message.  That would be much too expensive, every few minutes is
> plenty for me.
>
> I want to guarantee that if I commit a message, that my message has
> been processed by *my application* at least once.  (In another thread
> I mentioned something about reading exactly once, but that is not what
> I am going for here -- more than once is OK, as long as its at least
> once.)  That requires me to make sure that a commit never happens
> between a call to next() and doSomething().  Kafka guarantees that
> messages get read off the queue at least once, but as I outlined
> above, a naive solution allows for messages to get read off the queue,
> but not make it into my app.
>
> Not only do I not want to commit every message -- I'd really like to
> have the above guarantee without even acquiring a lock (most of the
> time).  That is what I was getting at with the comment about the
> spin-lock, just as an idea of how to prevent a commit between next()
> and doSomething().  I dunno if the spin-lock was really the right
> idea, that was just a random thought, but the point is, we want some
> check that should be very cheap for 99.999% of the time when a commit
> isn't happening, but can still guarantee proper ordering during that
> 0.001% of the time when a commit is happening.  otherwise, messages
> might never make it to my app.  (And this is all just to prevent a
> message getting lost during the 10^-6% of the time when a commit might
> happen between next() and doSomething(), and the app dies before
> doSomething completes!)
>
> Actually after a bit more thought -- the best way to guarantee this
> would be with a small api change, that would do away with the need for
> locks completely.  The iterator should have a method applyToNext:
>
> def applyToNext(f: MessageAndData[K,V] => Unit) {
>   //all the stuff in next, *except*
> currentTopicInfo.resetConsumeOffset(consumedOffset)
>   val item = ...
>   f(item)
>   //after we've applied the users function, now we can update the
> offset that should get committed
>   currentTopicInfo.resetConsumeOffset(consumedOffset)
> }
>
> I think this way, you could avoid any problems even with auto-commit.
> Or, if you don't want to add a new method, so we can stick to the
> Iterator api, then maybe the iterator could let you register a
> preCommitFunction, so next() would change to:
>
> override def next(): MessageAndMetadata[K, V] = {
>     ...
>     val item = ...
>     preCommitFuntions.foreach{f => f(item)}
>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
>     ...
>     item
>   }
>
>
> thanks,
> Imran
>
> On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo <[EMAIL PROTECTED]>
> wrote:
> > You likely need to use a custom offset solution if you plan on committing
> > every message. With many partitions this puts a large burden on
> Zookeeper,
> > you end up needing to roll over your zk transaction logs fast as well or
> > rist filling up disk
> >
> >
> > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <[EMAIL PROTECTED]>
> wrote:
> >
> >> Hello Imran,
> >>
> >> The offset will only be updated when the next() function is called:
> >>
> >> override def next(): MessageAndMetadata[K, V] = {
> >>     ...
> >>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
> >>     ...
> >>     item
> >>   }
> >>
> >> instead of in makeNext(), which will just update consumedOffset, but
> that
> >> is not the value that will be committed using the commitOffset call. So
> as
> >> long as you turn of auto commit and only call commitOffset after the
> >> process(msg) call, not after the
> >>
> >> b = iter.next()
> >>
> >> is called, at-least-once is guaranteed.