-Re: processing a batch of messages in a "transaction"
Jason Rosenberg 2013-11-22, 04:06
Remember too, that different threads will always be processing a different
set of partitions. No 2 threads will ever own the same partition,
A consumer connector can own many partitions (split among its threads),
each with a different offset. So, yes, it is complicated, as you say, to
try to get coherent committing when you want to commit batches of messages,
while using multiple threads.
In this case, you would need to make sure that a commit happens only after
all threads have successfully processed a batch of messages (but no more),
and are all waiting for a single commit to start and finish.
So, it may be easier to think in terms of not having multiple threads,
etc., and instead limit the number of partitions/topics a single thread
might work on.
It is all pretty complicated, but I think it is so in the name of
high-throughput and performance. But there clearly is room for refactoring
(coming in 0.9!).
On Thu, Nov 21, 2013 at 2:51 PM, Imran Rashid <[EMAIL PROTECTED]> wrote:
> sorry, one more thought --
> I've realized that the difficulties are also because I'm trying to
> guarantee read *exactly* once. The standard consumer group guarantees
> read *at least* once (with more than one read happening for those
> messages that get read, but then the process dies before the offset is
> committed to zookeeper). I guess you probably wouldn't want read
> exactly once w/out batch processing, or at least very low rate of
> messages, to avoid a ton of zookeeper updates. Hopefully this helps
> explain the use case more.
> On Thu, Nov 21, 2013 at 12:19 PM, Imran Rashid <[EMAIL PROTECTED]>
> > Hi,
> > thanks again for the quick answer on this. However, though this
> > solution works, it is *really* complicated to get the user code
> > correct if you are reading data with more than 1 thread. Before I
> > begin processing one batch of records, I have to make sure all of the
> > workers reading from kafka streams have stopped. However, those
> > workers could be blocked inside an iterator.hasNext(), and could get
> > unblocked at any time in the middle of my batch processing. This
> > makes it really tricky to ensure that the messages that made it into
> > the batch are *exactly* the same as the offsets of all workers at the
> > end of my batch processing, so that commitOffsets() is accurate.
> > But, I think the good news is that this would become much simpler with
> > a small api change. Would it be OK to add another version of
> > commitOffsets, but instead that took the offsets as a parameter,
> > rather than using whatever the current position is? That way workers
> > could simply snapshot their current offset & their batch, and continue
> > processing. Another thread could process the batch & commit the
> > offsets associated w/ the batch. The coordination issues mostly
> > disappear (and you also might get higher throughput, since workers can
> > keep going during batch processing).
> > Would such a change be reasonable? I could probably submit a patch for
> > On Wed, Nov 20, 2013 at 9:05 AM, Imran Rashid <[EMAIL PROTECTED]>
> >> perfect, thank you!
> >> On Wed, Nov 20, 2013 at 8:44 AM, Neha Narkhede <[EMAIL PROTECTED]>
> >>> You can turn off automatic offset commit (auto.commit.enable=false)
> and use
> >>> the commitOffsets() API. Note that this API will commit offsets for all
> >>> partitions owned by the consumer.
> >>> Thanks,
> >>> Neha
> >>> On Nov 20, 2013 6:39 AM, "Imran Rashid" <[EMAIL PROTECTED]> wrote:
> >>>> Hi,
> >>>> I have an application which reads messages from a kafka queue, builds
> >>>> up a batch of messages, and then performs some action on that batch.
> >>>> So far I have just used the ConsumerGroup api. However, I realized
> >>>> there is a potential problem -- my app may die sometime in the middle
> >>>> of the batch, and then those messages could get completely lost.