Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka, mail # user - High Level Consumer error handling and clean exit


+
Chris Curtin 2013-07-09, 15:16
+
Philip OToole 2013-07-09, 15:22
+
Chris Curtin 2013-07-09, 18:23
+
Philip OToole 2013-07-09, 19:24
+
Chris Curtin 2013-07-09, 19:54
+
Chris Curtin 2013-07-09, 20:09
Copy link to this message
-
Re: High Level Consumer error handling and clean exit
Ian Friedman 2013-07-09, 21:51
Hey Chris,

The way I handled this in my application using the High Level Consumer was to turn off auto-commit and commit manually after finishing a batch of messages (obviously you could do it after every message, but for my purposes it was better to have batches)

--
Ian Friedman
On Tuesday, July 9, 2013 at 4:09 PM, Chris Curtin wrote:

> Enhancement submitted: https://issues.apache.org/jira/browse/KAFKA-966
>
>
>
> On Tue, Jul 9, 2013 at 3:53 PM, Chris Curtin <[EMAIL PROTECTED] (mailto:[EMAIL PROTECTED])> wrote:
>
> > Thanks. I know I can write a SimpleConsumer to do this, but it feels like
> > the High Level consumer is _so_ close to being robust enough to handle
> > what I'd think people want to do in most applications. I'm going to submit
> > an enhancement request.
> >
> > I'm trying to understand the level of data loss in this situation, so I
> > looked deeper into the KafkaStream logic: it looks like a KafkaStream
> > includes a BlockingQueue for transferring the messages to my code from
> > Kafka. If I call shutdown() when I detect the problem, are the messages
> > already in the BlockingQueue considered 'read' by Kafka, or does the
> > shutdown peek into the Queue to see what is still there before updating
> > ZooKeeper?
> >
> > My concern is if that queue is not empty I'll be losing more than the one
> > message that led to the failure.
> >
> > I'm also curious how others are handling this situation. Do you assume the
> > message that is causing problems is lost or somehow know to go get it
> > later? I'd think others would have this problem too.
> >
> > Thanks,
> >
> > Chris
> >
> >
> >
> > On Tue, Jul 9, 2013 at 3:23 PM, Philip O'Toole <[EMAIL PROTECTED] (mailto:[EMAIL PROTECTED])> wrote:
> >
> > > OK.
> > >
> > > It sounds like you're requesting functionality that the high-level
> > > consumer
> > > simply doesn't have. As I am sure you know, there is no API call that
> > > supports "handing back a message".
> > >
> > > I might be missing something, but if you need this kind of control, I
> > > think
> > > you need to code your application differently. You could try creating a
> > > ConsumerConnection per partition (your clients will then need to know the
> > > number of partitions out there). That way commitOffsets() will actually
> > > only apply to that partition. Auto-commit the same way. It might give you
> > > the level of control you need.
> > >
> > > Philip
> > >
> > > On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin <[EMAIL PROTECTED] (mailto:[EMAIL PROTECTED])>
> > > wrote:
> > >
> > > > Hi Philip,
> > > >
> > > > Correct, I don't want to explicitly control the offset committing. The
> > > > ConsumerConnector handles that well enough except for when I want to
> > > > shutdown and NOT have Kafka think I consumed that last message for a
> > > > stream. This isn't the crash case, it is a case where the logic
> > > >
> > >
> > > consuming
> > > > the message detects and error and wants to cleanly exit until that issue
> > > > can be resolved, but not lose the message it was trying to process when
> > > >
> > >
> > > the
> > > > problem is resolved.
> > > >
> > > > My understanding is that the commitOffsets() call is across all threads,
> > > > not just for the stream my thread is reading from. So knowing it is
> > > >
> > >
> > > okay to
> > > > call this requires coordination across all my threads, which makes a
> > >
> > > High
> > > > Level Consumer a lot harder to write correctly.
> > > >
> > > > Thinking about what I'd like to happen is: my code hands the message
> > > back
> > > > to the KafkaStream (or whatever level knows about the consumed offsets)
> > >
> > > and
> > > > says
> > > > - set the next start offset for this topic/partition to this message in
> > > > ZooKeeper
> > > > - cleanly shutdown the stream from the broker(s)
> > > > - don't force a rebalance on the consumer since something is wrong with
> > > > processing of the data in the message, not the message.
 
+
Chris Curtin 2013-07-10, 11:14