Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - are kafka consumer apps guaranteed to see msgs at least once?


Copy link to this message
-
Re: are kafka consumer apps guaranteed to see msgs at least once?
Jason Rosenberg 2013-11-23, 11:42
Interesting, but I think you might run into problems when partitions get
rebalanced between processes (not just between threads in the same process).

So, you'd need to coordinate your issue of not committing backward, between
threads running on different processes.  I'm trying to think now, what
might be the worst case if you commit an offset lower than one that was
already committed by another thread/process.  I guess the danger is only in
truly unnecessary duplicate processing?  Or is there an actual danger that
the system might flail and never move forward!?

Are you attempting to add commitOffsetForPartition(partition, offset)
yourself, as a mod to the ZookeeperConsumerConnector?  One thing that's
missing too, is some sort of call back by the ConsumerConnector to notify
when a rebalance is happening.  This would be useful for trying to make
consumer apps a bit more proactive in that case, if not using the default
autocommit.

Interesting stuff....

Jason
On Sat, Nov 23, 2013 at 12:55 AM, Imran Rashid <[EMAIL PROTECTED]> wrote:

> MessageAndMetadata includes the partition and offset along w/ each
> message.  So this is enough for the workers to track which partitions
> they need to update, and by what amount.  There is the possibility
> that you'd commit an update for a partition that you *used* to own,
> but has since be rebalanced elsewhere.  But, this is actually OK, as
> long as we protect against commits moving the offset backwards.  Eg.,
> we need to prevent Thread A from committing the offset of 60, after B
> has already committed an offset of 100.  But its fine if Thread A
> commits an offset of 60 when Thread B has actually started reading
> from 50, and is now up to 70, if B hasn't committed yet.  Or even if
> Thread B has read up to 59, and it has committed.
>
> of course its a headache for the user to have to track those offsets
> themselves in every worker, but we can provide some tools that do that
> automatically, so the user api doesn't need to worry about it.
>
> I swear I'll have my code ready to look at soon -- I think its coded
> but I need to clean it up some ...
>
>
> On Fri, Nov 22, 2013 at 5:38 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
> > I think that the problem is, you don't know which partitions a thread is
> > currently 'owning', and therefore, you don't know which partitions you
> can
> > commit to.
> >
> >
> > On Fri, Nov 22, 2013 at 5:28 PM, Imran Rashid <[EMAIL PROTECTED]>
> wrote:
> >
> >> I don't think I need control over which partitions are processed by a
> >> thread -- however, I do need the partition --> thread assignment to be
> >> consistent.  I didn't realize that assignment could change, I guess
> >> that could mean my solution might not work.  It depends a bit on what
> >> happens during reassignment.
> >>
> >> Here's an example: initially Thread A is processing a partition.  It
> >> commits its read up to offset 50, but keeps reading up till offset 60.
> >> At that point, the partition is re-assigned to Thread B (which I guess
> >> can be in a  completely process).  Where will thread B start reading?
> >> It seems like it definitely can't start reading at 60 b/c then you
> >> can't guarantee that messages 50-60 ever get processed.  (Not to
> >> mention that it probably doesn't even know A got to 60.)
> >>
> >> But if B starts at 50, then I don't see how each thread committing
> >> their own offsets is a problem.  We just need some rule that you can't
> >> move an offset backwards on a commit (at least, not without some force
> >> option).  Without that check, I suppose you could have B read up to
> >> 100, commit, and then A commits its read up to 60.  (which wouldn't
> >> even violate "at least once", but it would certainly be undesirable).
> >>
> >> The fact that A & B could also be reading from other partitions at the
> >> same time doesn't seem like a problem to me, each thread would commit
> >> their positions in all of their partitions.
> >>
> >> what am I overlooking?